大規模データの個人情報の削除、S3ストリーミングを活用したPythonスクリプトの実装

大規模データの個人情報の削除、S3ストリーミングを活用したPythonスクリプトの実装

Clock Icon2024.10.12

AWS事業本部コンサルティング部の石川です。例えば、個人情報保護法の削除依頼に対応するため、データレイク上の特定のレコードを削除する必要があります。今回は、そのような状況を想定し、大容量のCSVファイルを効率的に処理するPythonスクリプトをサンプル実装しました。本番のデータで利用する場合は動作検証した後、ご利用ください。

課題と条件

処理対象のファイルは以下の特徴を持っています。

  • gzip圧縮済みのTSVファイル
  • S3上に保存されており、圧縮状態で1GB
  • メモリとディスク容量に制限があるため、ローカルにコピーして処理することは不可能

これらの制約の中で、特定の列の条件に一致するレコードを削除する必要があります。

解決策:ストリーミング処理

上記の課題に対応するため、以下の特徴を持つPythonスクリプトを実装しました:

  1. S3からのストリーミング読み込み
  2. gzipファイルの逐次解凍
  3. 条件に基づくレコードの処理
  4. 処理済みデータのS3へのストリーミングアップロード

サンプルコード

import boto3
import gzip
import csv
import io
from botocore.exceptions import ClientError

def process_s3_gzip_tsv(input_bucket, input_key, output_bucket, output_key, condition_col, condition_val):
    # ストリーミング用のバッファサイズ
    buffer_size = 5 * 1024 * 1024  # 5MB

    # 処理したレコード数とフィルタリングされたレコード数を追跡
    total_records = 0
    filtered_records = 0
    written_records = 0

    try:
        # 入力ファイルをストリーミングで読み込む
        response = s3_client.get_object(Bucket=input_bucket, Key=input_key)
        stream = response['Body']

        output_buffer = io.BytesIO()

        with gzip.GzipFile(fileobj=stream, mode='r') as gzip_file:
            reader = csv.reader(io.TextIOWrapper(gzip_file, encoding='utf-8'), delimiter='\t')

            # ヘッダーを処理
            header = next(reader)
            column_a_index = header.index(condition_col)

            # ヘッダーを書き込む
            output_buffer.write(('\t'.join(header) + '\n').encode('utf-8'))

            for row in reader:
                total_records += 1

                # 列Aの値が条件に一致しない場合、行を書き込む
                if row[column_a_index] != condition_val:
                    output_buffer.write(('\t'.join(row) + '\n').encode('utf-8'))
                    written_records += 1
                else:
                    filtered_records += 1

        # 圧縮してアップロード
        output_buffer.seek(0)
        compressed_data = gzip.compress(output_buffer.getvalue())

        if len(compressed_data) < 5 * 1024 * 1024:  # 5MB未満の場合は通常のアップロードを使用
            s3_client.put_object(Bucket=output_bucket, Key=output_key, Body=compressed_data)
        else:
            # マルチパートアップロードを開始
            mpu = s3_client.create_multipart_upload(Bucket=output_bucket, Key=output_key, ContentType='application/gzip')
            parts = []

            for i in range(0, len(compressed_data), buffer_size):
                part_number = i // buffer_size + 1
                part = s3_client.upload_part(
                    Bucket=output_bucket,
                    Key=output_key,
                    PartNumber=part_number,
                    UploadId=mpu['UploadId'],
                    Body=compressed_data[i:i+buffer_size]
                )
                parts.append({
                    'PartNumber': part_number,
                    'ETag': part['ETag']
                })

            # マルチパートアップロードを完了
            s3_client.complete_multipart_upload(
                Bucket=output_bucket,
                Key=output_key,
                UploadId=mpu['UploadId'],
                MultipartUpload={'Parts': parts}
            )

        print(f"処理完了: 合計 {total_records} レコード、フィルタリング {filtered_records} レコード、書き込み {written_records} レコード")

        # 出力ファイルが0バイトでないことを確認
        output_size = s3_client.head_object(Bucket=output_bucket, Key=output_key)['ContentLength']
        if output_size == 0:
            raise ValueError("出力ファイルが0バイトです。")
        else:
            print(f"出力ファイルサイズ: {output_size} バイト")

    except ClientError as e:
        print(f"エラー: {e}")
        raise

PROFILE_NAME = 'ishikawa'
session = boto3.Session()
if PROFILE_NAME in boto3.Session().available_profiles:
    session = boto3.Session(profile_name=PROFILE_NAME)
s3_client = session.client('s3')

input_bucket = 'cm-test'
input_key = 'ssbgz/customer/customer.tsv.gz'
output_bucket = 'cm-test'
output_key = 'ssbgz/customer/customer_new.tsv.gz'
condition_col = "c_custkey"
condition_val = "1"

process_s3_gzip_tsv(input_bucket, input_key, output_bucket, output_key, condition_col, condition_val)

実行例

データは、以下のようなヘッダ付きのTSV(タブ区切り)の圧縮ファイル(gzip)です。

202141012-delete-privacy-data-on-s3-1

実行結果は、以下のとおりです。

$ python csvrep.py
処理完了: 合計 3000000 レコード、フィルタリング 1 レコード、書き込み 2999999 レコード
出力ファイルサイズ: 87804002 バイト

私のMacBookで実行すると12秒程度でした。

python csvrep.py  11.73s user 0.56s system 32% cpu 38.095 total

主要な特長

  1. ストリーミング読み込み: s3_client.get_objectを使用してS3からデータをストリーミングで読み込みます。

  2. gzip解凍: gzip.GzipFileを使用して、ストリームから直接gzipファイルを解凍します。

  3. 効率的なメモリ使用: 大きなファイルを一度にメモリに読み込むのではなく、レコードごとに処理します。

  4. マルチパートアップロード: 処理済みデータを一定サイズごとにS3にアップロードし、最後にマルチパートアップロードを完了させます。

パフォーマンスと効率性

このアプローチにより、以下の利点が得られます:

  • メモリ効率: 大容量ファイルでもメモリ使用量を抑えられます。
  • ディスク効率: ローカルディスクを使用せず、直接S3とやり取りします。
  • スケーラビリティ: ファイルサイズに関係なく処理可能です。

実装上の課題

Pythonで実装したため、CPUコアが1つのみで動作するため、ファイルサイズに比例して処理時間が増加します。

但し、データファイルは最大でも1GB未満であることが多く問題にはならないことが多いでしょう。また、AWSのLambda関数などを用いて並列分散処理をすることで単位時間で対象のデータを変換することを想定しています。

まとめ

最初は、csvqというオープンソースのツールを用いることを検討しましたが、メモリに読み込んだ後処理する仕様であったので、新たにプログラムを作成するに至りました。

このスクリプトは、大容量のgzip圧縮TSVファイルを効率的に処理し、個人情報保護法の要件に対応するデータ削除を実現します。メモリとディスク使用量を最小限に抑えつつ、S3上のデータを直接処理できる柔軟な実装となっています。

今回は、レコードを削除する仕様ですが、プログラムを修正することで、データのマスキングや変更などにも応用が可能です。ご参考になれば幸いです。

合わせて読みたい

https://dev.classmethod.jp/articles/csvq-by-go/

この記事をシェアする

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.